package com.vaye.im.websocket;

import cn.dev33.satoken.session.SaSession;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.net.Ipv4Util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.vaye.common.constant.CacheConstant;
import com.vaye.common.constant.ImConstant;
import com.vaye.common.enums.TopicTypeEnums;
import com.vaye.common.utils.IPUtils;
import com.vaye.common.utils.RedisUtil;
import com.vaye.common.utils.SpringContextUtils;
import com.vaye.common.utils.UserSessionUtils;
import com.vaye.im.dto.TopicMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 聊天室(群聊升级版)
 * @author wangzhiyong
 * @date 2022年04月04日 上午8:24
 * 1） value = "/ws/{userId}"
 * onOpen(@PathParam("userId") String userId, Session session){ // ... }
 * 这种方式必须在前端在/后面拼接参数 ws://localhost:7889/productWebSocket/123 ，否则404
 *
 * 2） value = "/ws"
 * onOpen(Session session){ // ... }
 * Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
 * // 获得 ?userId=123 这样的参数
 * @author bart
 */
@Slf4j
@Component
@ServerEndpoint(value = "/chat2/{roomId}",configurator = WebSocketConfigurator.class) //添加消息编码器
public class ProductWebSocket31 {

    //房间在线人数统计
    private static ConcurrentHashMap<String, AtomicInteger> rootNumMap = new ConcurrentHashMap();

    private StringRedisTemplate stringRedisTemplate = SpringContextUtils.getBean(StringRedisTemplate.class);

    @OnOpen
    public void onOpen(@PathParam("roomId") String roomId, Session session){
        log.info("ProductWebSocket31开始建立链接");
        Map<String, Object> userProperties = session.getUserProperties();
        Long userId = (Long) userProperties.get(ImConstant.LOGINID);
        String nick = ((JSONObject) StpUtil.getSessionByLoginId(userId).getDataMap().get(SaSession.USER)).getString("nick");
        if (StringUtils.isEmpty(roomId)) {
            log.error("websocket连接 缺少参数");
            throw new IllegalArgumentException("websocket连接 缺少参数");
        }
        //加入会话
        UserSessionUtils.add(userId,session,nick);
        //加入房间
        UserSessionUtils.addRoom(roomId,userId);
        log.info("websocket 新客户端连入，用户id：{},房间id={}",userId,roomId);
        log.info("room={}当前房间在线人数：{}人",roomId,addOnlineCount(roomId));
//        pushWebSocket.add(userId,session);
        multicastMessage(roomId,null,"欢迎" +UserSessionUtils.getNickName(userId)+"加入房间");
    }

    /**
     * 服务端接收到信息后调用
     *
     * @param session
     */
    @OnMessage
    public void onMessage(@PathParam("roomId") String roomId,String msg, Session session) {
        log.info("ProductWebSocket31.onMessage");
        Map<String, Object> userProperties = session.getUserProperties();
        Long userId = (Long) userProperties.get(ImConstant.LOGINID);
        if (StringUtils.isEmpty(roomId)) {
            log.error("websocket连接 缺少参数");
            throw new IllegalArgumentException("websocket连接 缺少参数");
        }
        multicastMessage(roomId,userId, msg);
        TopicMsg topicMsg = new TopicMsg();
        topicMsg.setType(TopicTypeEnums.GROUP_CHAT.getCode());
        topicMsg.setFromUserId(String.valueOf(userId));
        topicMsg.setRoomId(roomId);
        topicMsg.setMsg(msg);
        topicMsg.setIp(IPUtils.getLocalIP());
        String body = JSON.toJSONString(topicMsg);
        stringRedisTemplate.convertAndSend(CacheConstant.WS_CHAT_TOPIC,body);
    }

    /**
     * 连接关闭时调用
     */
    @OnClose
    public void onClose(@PathParam("roomId") String roomId, Session session) {
        Map<String, Object> userProperties = session.getUserProperties();
        Long userId = (Long) userProperties.get(ImConstant.LOGINID);
        if (StringUtils.isEmpty(roomId)) {
            log.error("websocket连接 缺少参数");
            throw new IllegalArgumentException("websocket连接 缺少参数");
        }
        log.info("一个客户端关闭连接，用户id：{},房间id={}",userId,roomId);
        UserSessionUtils.exitRoom(roomId,userId);
        log.info("room={}当前房间在线人数：{}人",roomId,subOnlineCount(roomId));
        multicastMessage(roomId,null,UserSessionUtils.getNickName(userId)+"退出房间");
    }

    /**
     * 服务端websocket出错时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket出现错误");
        error.printStackTrace();
    }

    public void multicastMessage(String roomId,Long userId,String msg){
        List<Pair<Long, Session>> roomSessions = UserSessionUtils.getRoomSessions(roomId);
        if (CollectionUtils.isEmpty(roomSessions)) {
            log.error("room={}房间0人在线",roomId);
        }
        String finalMsg = ObjectUtils.isEmpty(userId)?msg:String.format("%s:%s",UserSessionUtils.getNickName(userId),msg);
        roomSessions.stream()
                .forEach(e ->{
                    if (!ObjectUtils.isEmpty(userId) && userId.equals(e.getFirst())) {
                        return;
                    }
                    try {
                        e.getSecond().getBasicRemote().sendText(finalMsg);
                    } catch (IOException exception) {
                        exception.printStackTrace();
                    }
                });
    }

    public static synchronized int subOnlineCount(String roomId) {
        AtomicInteger atomicInteger = rootNumMap.get(roomId);
        int sub = atomicInteger.decrementAndGet();
        System.out.println("sub = " + sub);
        return sub;
    }

    public static synchronized int getOnlineCount(String roomId) {
        AtomicInteger atomicInteger = rootNumMap.get(roomId);
        int get = atomicInteger.get();
        System.out.println("get = " + get);
        return get;
    }

    public static synchronized int addOnlineCount(String roomId) {
        AtomicInteger atomicInteger = rootNumMap.get(roomId);
        if (ObjectUtils.isEmpty(atomicInteger)) {
            atomicInteger = new AtomicInteger(1);
            rootNumMap.put(roomId, atomicInteger);
            return atomicInteger.get();
        } else {
            return atomicInteger.incrementAndGet();
        }
    }
}
